package io.feige.rpc.producer.network.handler;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import io.feige.rpc.producer.RpcProducerService;
import io.feige.rpc.protocol.nettyobj.pojo.IORequestMessage;
import io.feige.rpc.protocol.nettyobj.pojo.IOResponseMessage;
import io.feige.rpc.protocol.nettyobj.pojo.Ping;
import io.feige.rpc.protocol.nettyobj.pojo.Pong;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 

public class ServerInvokerHandler extends SimpleChannelHandler {
	
	private static Logger logger = LoggerFactory.getLogger(ServerInvokerHandler.class);
	
	private Queue<Object> responseQueue=new ConcurrentLinkedQueue<Object>();
	
	private RpcProducerService rpcProducerService;
	
	public ServerInvokerHandler(RpcProducerService rpcProducerService) {
		this.rpcProducerService=rpcProducerService;
	}

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
 
		if (e.getMessage() instanceof Ping) {
			logger.debug("received ping packet from {}", e.getChannel().getRemoteAddress());
			e.getChannel().write(new Pong()); 
		}else{ 
			final IORequestMessage request=(IORequestMessage) e.getMessage();
			final Channel ch=e.getChannel(); 
			rpcProducerService.execute(new Runnable() {
				
				public void run() {  
					IOResponseMessage res=new IOResponseMessage();
					res.setSeq(request.getSeq());
					try {
						Object result=rpcProducerService.invoke(request.getService(), request.getMethod(), request.getTypes(), request.getArgs());
						res.setResult(result);
					} catch (Exception e) {
						res.setException(e);
						logger.warn("service invoke error", e); 
					}
					if (ch.isWritable()) {
						ch.write(res); 
					}else{
						responseQueue.offer(res);
					}
				}
			});
		}
        super.messageReceived(ctx, e); 
	}
	
	@Override
	public void channelInterestChanged(ChannelHandlerContext ctx,
			ChannelStateEvent e) throws Exception {
		Channel channel=e.getChannel();
		Object res;
		while((res=responseQueue.poll())!=null){
			if (channel.isWritable()) {
				channel.write(res);
			}else{
				responseQueue.offer(res);
				break;
			}
		}
		super.channelInterestChanged(ctx, e);
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
			throws Exception {
		e.getCause().printStackTrace();
		logger.warn("exceptionCaught:{}", e.getCause().getMessage());
	}
	
}
